import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.api.common.functions.GroupReduceFunction;


public class DistinctReduce implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
    @Override
    public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {

        Set<String> uniqStrings = new HashSet<String>();
        Integer key = null;



        // add all strings of the group to the set
        for (Tuple2<Integer, String> t : in) {
            key = t.f0;
            uniqStrings.add(t.f1);
            System.out.println("uniqStrings="+uniqStrings);
        }
// 对于相同key的对应value进行去重操作,去重后的value得到一个集合
        System.out.println("uniqStrings="+uniqStrings);



        System.out.println("--------------------下面collect是输出---------------------------");
        // emit all unique strings.

//        对上述得到的value集合进行遍历,拼接同一个key
        for (String s : uniqStrings) {
            out.collect(new Tuple2<Integer, String>(key, s));
            //这里相当于return
        }
    }
}